{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "extreme-electron",
   "metadata": {},
   "source": [
    "# spark-ts-trends"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "54f97720-7776-4e11-859e-95371afd3dad",
   "metadata": {},
   "source": [
    "Computes trends of time series by fitting a low order polinomial\n",
    "\n",
    "WARNING: This component currently only supports local execution (not Kubeflow/Airflow)  \n",
    "WARNING: This component currently only supports copernicus climate data\n",
    "\n",
    "\n",
    "Future work    \n",
    "[ ] Generalize component  \n",
    "[ ] Make component run on KubeFlow/Airflow pipelines"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "portable-cleaners",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "export version=`python --version |awk '{print $2}' |awk -F\".\" '{print $1$2}'`\n",
    "\n",
    "echo $version\n",
    "\n",
    "if [ $version == '36' ] || [ $version == '37' ]; then\n",
    "    echo 'Starting installation...'\n",
    "    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log\n",
    "    if [ $? == 0 ]; then\n",
    "        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'\n",
    "    else\n",
    "        echo 'Installation failed, please check log:'\n",
    "        cat install.log\n",
    "    fi\n",
    "elif [ $version == '38' ] || [ $version == '39' ]; then\n",
    "    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log\n",
    "    if [ $? == 0 ]; then\n",
    "        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'\n",
    "    else\n",
    "        echo 'Installation failed, please check log:'\n",
    "        cat install.log\n",
    "    fi\n",
    "else\n",
    "    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'\n",
    "    exit -1\n",
    "fi"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "welsh-intention",
   "metadata": {},
   "outputs": [],
   "source": [
    "import wget\n",
    "wget.download(\n",
    "    'https://raw.githubusercontent.com/IBM/claimed/master/component-library/claimed_utils.py'\n",
    ")\n",
    "from claimed_utils import parse_args_to_parameters\n",
    "import numpy as np\n",
    "import os\n",
    "import pandas as pd\n",
    "import pickle\n",
    "from pyspark import SparkConf\n",
    "from pyspark import SparkContext\n",
    "from pyspark.sql import SparkSession\n",
    "import shutil"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "brown-estate",
   "metadata": {},
   "outputs": [],
   "source": [
    "# data_parquet path and parquet file name (default: data.parquet)\n",
    "data_parquet = os.environ.get('data_parquet', 'data.parquet')\n",
    "\n",
    "# @param master url of master (default: local mode)\n",
    "master = os.environ.get('master', \"local[*]\")\n",
    "\n",
    "# data_dir temporal data storage for local execution\n",
    "data_dir = os.environ.get('data_dir', '../../data/')\n",
    "\n",
    "# output_result_filename parquet file name of result (default: trends.parquet)\n",
    "output_result_filename = os.environ.get('output_result_filename', 'trends.parquet')\n",
    "\n",
    "parse_args_to_parameters()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fallen-dallas",
   "metadata": {},
   "outputs": [],
   "source": [
    "config = SparkConf().setMaster(master).setAll([\n",
    "    ('spark.executor.memory', '8g'),\n",
    "    ('spark.driver.memory', '8g'),\n",
    "    ('spark.sql.execution.arrow.pyspark.enabled', 'true')\n",
    "])\n",
    "sc = SparkContext.getOrCreate(config)\n",
    "spark = SparkSession.builder.getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "attended-artist",
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.parquet(data_dir + data_parquet)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "impossible-august",
   "metadata": {},
   "outputs": [],
   "source": [
    "df.createOrReplaceTempView('df')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "indirect-coach",
   "metadata": {},
   "outputs": [],
   "source": [
    "if os.path.exists(data_dir + 'lon_lat.p'):\n",
    "    lon_lat = pickle.load(open(data_dir + \"lon_lat.p\", \"rb\"))\n",
    "else:\n",
    "    lon_lat = spark.sql('''\n",
    "        select lon, lat from df where group by lon, lat\n",
    "    ''').rdd.map(lambda x: [x.lon, x.lat]).collect()\n",
    "    pickle.dump(lon_lat, open(data_dir + \"lon_lat.p\", \"wb\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "afraid-helicopter",
   "metadata": {},
   "outputs": [],
   "source": [
    "if os.path.exists(data_dir + 'lon_lat_slope.p'):\n",
    "    lon_lat_slope = pickle.load(open(data_dir + \"lon_lat_slope.p\", \"rb\"))\n",
    "else:\n",
    "    lon_lat_slope = dict()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "muslim-tattoo",
   "metadata": {
    "scrolled": true,
    "tags": []
   },
   "outputs": [],
   "source": [
    "for index in range(len(lon_lat)):\n",
    "    if not lon_lat[index][0] + ':' + lon_lat[index][1] in lon_lat_slope:\n",
    "        sm = spark.sql(\"select sm from df where sm <> 'null' and lat='\" \n",
    "                       + lon_lat[index][0]\n",
    "                       + \"' and lon='\"\n",
    "                       + lon_lat[index][1]\n",
    "                       + \"' order by time asc\").collect()\n",
    "        sm = np.array(list(map(lambda x: x.sm, sm)))\n",
    "        sm = sm.astype(float)\n",
    "        if len(sm) > 0:\n",
    "            try:\n",
    "                coefficients, residuals, _, _, _ = np.polyfit(\n",
    "                    range(len(sm)), sm, 1, full=True)\n",
    "                mse = residuals[0] / (len(sm))\n",
    "                nrmse = np.sqrt(mse) / (sm.max() - sm.min())\n",
    "                print('Slope ' + str(coefficients[1]))\n",
    "                print('NRMSE: ' + str(nrmse))\n",
    "                lon_lat_slope[lon_lat[index][0] + \":\" + lon_lat[index][1]] =\n",
    "                [coefficients[1]]\n",
    "                pickle.dump(lon_lat_slope, open(\n",
    "                    data_dir + \"lon_lat_slope.p\", \"wb\"))\n",
    "            except ValueError:\n",
    "                print(\"Could not convert data\")\n",
    "            except Exception:\n",
    "                print('skipping, Generic Error')\n",
    "        else:\n",
    "            print('skipping, empty data')\n",
    "    else:\n",
    "        print('already processed')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "respected-oxide",
   "metadata": {},
   "outputs": [],
   "source": [
    "lon_lat_slope = pickle.load(open(data_dir + \"lon_lat_slope.p\", \"rb\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "american-style",
   "metadata": {},
   "outputs": [],
   "source": [
    "result = np.array([])\n",
    "for k, v in lon_lat_slope.items():\n",
    "    lon_lat = k.split(':')\n",
    "    lon = lon_lat[0]\n",
    "    lat = lon_lat[1]\n",
    "    slope = lon_lat_slope[k][0]\n",
    "    result = np.append(result, [lon, lat, slope])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "exclusive-industry",
   "metadata": {},
   "outputs": [],
   "source": [
    "result = result.reshape(int(len(result) / 3), 3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "consecutive-gambling",
   "metadata": {},
   "outputs": [],
   "source": [
    "result = pd.DataFrame(result, columns=['lon', 'lat', 'trend'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "gentle-organization",
   "metadata": {},
   "outputs": [],
   "source": [
    "result_df = spark.createDataFrame(result)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "employed-denmark",
   "metadata": {},
   "outputs": [],
   "source": [
    "if os.path.exists(data_dir + output_result_filename):\n",
    "    shutil.rmtree(data_dir + output_result_filename)\n",
    "result_df.write.parquet(data_dir + output_result_filename)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
